package handler

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
	"github.com/golang/protobuf/ptypes/empty"
	"github.com/opentracing/opentracing-go"
	"go.uber.org/zap"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/emptypb"
	"gorm.io/gorm"
	"os"
	. "service/order_srv/global"
	"service/order_srv/initialize"
	"service/order_srv/model"
	"service/order_srv/proto"
	"strconv"
	"sync/atomic"
	"time"
)

// 实现商品服务（service层）的接口
// 接口定义文件——order.proto

type OrderServer struct{}

//购物车
//CartItemList(context.Context, *UserInfo) (*CartItemListResponse, error)
//CreateCartItem(context.Context, *CartItemRequest) (*ShopCartInfoResponse, error)
//UpdateCartItem(context.Context, *CartItemRequest) (*emptypb.Empty, error)
//DeleteCartItem(context.Context, *CartItemRequest) (*emptypb.Empty, error)

//订单
//CreateOrder(context.Context, *OrderRequest) (*OrderInfoResponse, error)
//OrderList(context.Context, *OrderFilterRequest) (*OrderListResponse, error)
//OrderDetail(context.Context, *OrderRequest) (*OrderInfoDetailResponse, error)
//UpdateOrderStatus(context.Context, *OrderStatus) (*emptypb.Empty, error)

// 根据用户id获取购物车中的商品列表
func (o *OrderServer) CartItemList(c context.Context, r *proto.UserInfo) (*proto.CartItemListResponse, error) {
	var cartItems []*model.ShoppingCart
	res := DB.Where("user", r.Id).Find(&cartItems)
	if res.Error != nil {
		zap.S().Errorf("查询用户%d的购物车中商品列表时出错", r.Id)
		return nil, status.Errorf(codes.Internal, "查询用户%d的购物车中商品列表时出错", r.Id)
	}
	if res.RowsAffected == 0 {
		zap.S().Errorf("用户%d的购物车是空的", r.Id)
		return nil, status.Errorf(codes.NotFound, "用户%d的购物车是空的", r.Id)
	}

	cartItemListResponse := new(proto.CartItemListResponse)
	cartItemListResponse.Total = int32(res.RowsAffected)
	for _, item := range cartItems {
		// gorm可以自动将bool类型的值转化为mysql中tinyint类型的值
		cartItemListResponse.Data = append(cartItemListResponse.Data, &proto.ShopCartInfoResponse{
			Id:      item.ID,
			UserId:  item.User,
			GoodsId: item.Goods,
			Nums:    item.Nums,
			Checked: item.Checked,
		})
	}
	return cartItemListResponse, nil
}

// 向用户的购物车中添加一条商品记录
func (o *OrderServer) CreateCartItem(c context.Context, r *proto.CartItemRequest) (*proto.ShopCartInfoResponse, error) {
	// 针对这个接口而言，r中只有UserId、GoodsId、Nums是有效字段。首次添加商品后购物车中的该商品默认被勾选
	cartItem := &model.ShoppingCart{User: r.UserId, Goods: r.GoodsId}
	DB.Where(cartItem).First(cartItem)
	cartItem.Nums += r.Nums
	cartItem.Checked = true
	tx := DB.Begin()
	// 无则新增，有则更新
	if saveRes := tx.Save(cartItem); saveRes.Error != nil {
		tx.Rollback()
		zap.S().Errorf("购物车添加记录时出错：%v。用户id：%d，商品id:%d", saveRes.Error, r.UserId, r.GoodsId)
		return nil, status.Errorf(codes.Internal, "购物车添加记录时出错：%v。用户id：%d，商品id:%d", saveRes.Error, r.UserId, r.GoodsId)
	}
	tx.Commit()
	return &proto.ShopCartInfoResponse{
		Id:      cartItem.ID,
		UserId:  cartItem.User,
		GoodsId: cartItem.Goods,
		Nums:    cartItem.Nums,
		Checked: cartItem.Checked,
	}, nil
}

// 更新购物车中的记录
func (o *OrderServer) UpdateCartItem(c context.Context, r *proto.CartItemRequest) (*emptypb.Empty, error) {
	// 针对这个接口而言，r中UserId、GoodsId、Nums、checked是有效字段
	if r.Nums <= 0 {
		zap.S().Error("商品数量必须大于0")
		return nil, status.Errorf(codes.InvalidArgument, "商品数量必须大于0")
	}
	cartItem := &model.ShoppingCart{User: r.UserId, Goods: r.GoodsId}
	if res := DB.Where(cartItem).First(cartItem); res.RowsAffected == 0 {
		zap.S().Errorf("购物车中记录不存在。用户id：%d，商品id：%d", r.UserId, r.GoodsId)
		return nil, status.Errorf(codes.NotFound, "购物车中记录不存在。用户id：%d，商品id：%d", r.UserId, r.GoodsId)
	}
	cartItem.Nums = r.Nums
	cartItem.Checked = r.Checked
	tx := DB.Begin()
	if res := tx.Save(cartItem); res.Error != nil {
		tx.Rollback()
		zap.S().Errorf("更新购物车记录时出错：%v。用户id：%d，商品id:%d", res.Error, r.UserId, r.GoodsId)
		return nil, status.Errorf(codes.Internal, "更新购物车记录时出错：%v。用户id：%d，商品id:%d", res.Error, r.UserId, r.GoodsId)
	}
	tx.Commit()
	return &emptypb.Empty{}, nil
}

// 删除购物车中的记录
func (o *OrderServer) DeleteCartItem(c context.Context, r *proto.CartItemRequest) (*emptypb.Empty, error) {
	// 针对这个接口而言，r中只有Id是有效字段
	cartItem := new(model.ShoppingCart)
	cartItem.ID = r.Id

	result := DB.Where(cartItem).First(cartItem)
	if result.RowsAffected != 1 {
		zap.S().Errorf("购物车记录%d不存在", r.Id)
		return nil, status.Errorf(codes.NotFound, "购物车记录%d不存在", r.Id)
	}

	result = DB.Delete(cartItem)
	if result.Error != nil {
		return nil, status.Errorf(codes.Internal, result.Error.Error())
	}
	return &empty.Empty{}, nil
}

// 用于发送归还库存的事务消息
type OrderListener struct {
	OrderInfo *model.OrderInfo
	Ctx       context.Context // 传递上下文中的span信息
	Code      codes.Code
	ErrName   string
	ErrInfo   error
}

// 消息事务
func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
	topSpan := opentracing.SpanFromContext(o.Ctx)
	span1 := opentracing.StartSpan("gorm获取当前用户的购物车中已勾选的商品id", opentracing.ChildOf(topSpan.Context()))
	var cartItems []*model.ShoppingCart
	// 1.获取当前用户的购物车中已勾选的商品id
	if res := DB.Where(&model.ShoppingCart{User: o.OrderInfo.User, Checked: true}).Find(&cartItems); res.RowsAffected == 0 {
		o.Code = codes.NotFound
		o.ErrName = "未选中结算商品"
		o.ErrInfo = res.Error
		// 未进行库存扣减，出现错误不执行库存归还
		return primitive.RollbackMessageState
	}
	span1.Finish()

	var goodsIds []int32

	// 购物车中商品id与商品数量的映射关系，用于计算购物车中商品的总价
	m := make(map[int32]int32, 10)
	// sellInfo用于请求库存扣减服务
	sellInfo := new(proto.SellInfo)
	sellInfo.OrderSn = o.OrderInfo.OrderSn
	for _, item := range cartItems {
		m[item.Goods] = item.Nums
		goodsIds = append(goodsIds, item.Goods)
		sellInfo.GoodsInfo = append(sellInfo.GoodsInfo, &proto.GoodsInvInfo{
			GoodsId: item.Goods,
			Num:     item.Nums,
		})
	}

	// 2.根据商品id数组批量查询出各个商品的信息
	grpcCtxWithSpan := opentracing.ContextWithSpan(o.Ctx, topSpan)
	resGoods, err := GoodsSrvClient.BatchGetGoods(grpcCtxWithSpan, &proto.BatchGoodsIdInfo{Id: goodsIds})
	if err != nil {
		o.Code = codes.Internal
		o.ErrName = "批量查询商品信息失败"
		o.ErrInfo = err
		// 未进行库存扣减，出现错误不执行库存归还
		return primitive.RollbackMessageState
	}

	// 3.调用库存微服务进行库存扣减
	if _, err = InventorySrvClient.Sell(grpcCtxWithSpan, sellInfo); err != nil {
		// 可能出现库存扣减成功，但由于网络拥塞，导致这里err中出现错误。进而导致库存成功扣减但订单没有创建的情况。
		// 解决办法就是仅针对可预期的错误类型执行rollback，也就是说不处理网络拥塞导致响应超时的错误。
		if e, ok := status.FromError(err); ok {
			if e.Code() == codes.Internal || e.Code() == codes.InvalidArgument || e.Code() == codes.ResourceExhausted {
				o.Code = codes.Internal
				o.ErrName = "扣减库存失败"
				o.ErrInfo = err
				// 未进行库存扣减，出现错误不执行库存归还
				return primitive.RollbackMessageState
			}
		}
	}

	// 库存扣减成功
	// 模拟超时场景，测试rmq回查机制以及inventory的自动归还库存功能是否正常
	// time.Sleep(time.Hour)

	// 订单中所有商品的总价
	var orderTotalPrice float32
	// 此订单下所有商品的详细信息
	var orderGoods []*model.OrderGoods
	for _, good := range resGoods.Data {
		orderTotalPrice += good.ShopPrice * float32(m[good.Id])
		orderGoods = append(orderGoods, &model.OrderGoods{
			// 订单尚未创建成功，这里不能确定订单的主键id，待订单创建成功后再更新这里的订单主键id
			//Order:      0,
			Goods:      good.Id,
			GoodsName:  good.Name,
			GoodsImage: good.GoodsFrontImage,
			GoodsPrice: good.ShopPrice,
			Nums:       m[good.Id],
		})
	}

	// 4.生成订单信息并写入订单表
	o.OrderInfo.OrderMount = orderTotalPrice

	// 开启本地事务
	tx := DB.Begin()
	span2 := opentracing.StartSpan("gorm向OrderInfo表中新建订单记录", opentracing.ChildOf(topSpan.Context()))
	if res := tx.Save(o.OrderInfo); res.Error != nil {
		tx.Rollback()
		o.Code = codes.Internal
		o.ErrName = "创建订单时出错"
		o.ErrInfo = res.Error
		// 库存扣减已完成，出现错误执行库存归还
		return primitive.CommitMessageState
	}
	span2.Finish()

	// 更新订单下所有商品的订单id
	for i := range orderGoods {
		orderGoods[i].Order = o.OrderInfo.ID
	}

	// 5.向orderGoods表批量插入数据
	span3 := opentracing.StartSpan("gorm向orderGoods表批量插入数据", opentracing.ChildOf(topSpan.Context()))
	if res := tx.CreateInBatches(orderGoods, 100); res.Error != nil {
		tx.Rollback()
		o.Code = codes.Internal
		o.ErrName = "批量创建订单中商品信息时出错"
		o.ErrInfo = res.Error
		// 库存扣减已完成，出现错误执行库存归还
		return primitive.CommitMessageState
	}
	span3.Finish()

	span4 := opentracing.StartSpan("gorm删除购物车中已勾选的商品记录", opentracing.ChildOf(topSpan.Context()))
	// 6.删除购物车中已勾选的商品记录
	if res := tx.Where(&model.ShoppingCart{User: o.OrderInfo.User, Checked: true}).Delete(&model.ShoppingCart{}); res.Error != nil {
		tx.Rollback()
		o.Code = codes.Internal
		o.ErrName = "批量创建订单中商品信息时出错"
		o.ErrInfo = res.Error
		// 库存扣减已完成，出现错误执行库存归还
		return primitive.CommitMessageState
	}
	span4.Finish()

	// 创建订单之后，提交事务之前，向rmq发送一个延时消息
	span5 := opentracing.StartSpan("创建订单之后，提交事务之前，向rmq发送一个延时消息", opentracing.ChildOf(topSpan.Context()))
	p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"127.0.0.1:9876"}))
	if err != nil {
		tx.Rollback()
		o.Code = codes.Internal
		o.ErrName = "实例化producer失败"
		o.ErrInfo = err
		// 库存扣减完成，延时消息发送失败，创建订单的事务回滚，则直接归还库存。
		// 这里出现错误后，不会返回支付url
		return primitive.CommitMessageState
	}

	if err = p.Start(); err != nil {
		tx.Rollback()
		o.Code = codes.Internal
		o.ErrName = "启动producer失败"
		o.ErrInfo = err
		return primitive.CommitMessageState
	}

	delayMsg := primitive.NewMessage(
		// topic不存在则会自动创建
		"order_timeout",
		[]byte(o.OrderInfo.OrderSn))
	// 测试时延时30s
	//delayMsg.WithDelayTimeLevel(4)
	// 正式环境延时30min
	delayMsg.WithDelayTimeLevel(16)
	_, err = p.SendSync(context.Background(), delayMsg)
	if err != nil {
		tx.Rollback()
		o.Code = codes.Internal
		o.ErrName = "发送延时消息失败"
		o.ErrInfo = err
		return primitive.CommitMessageState
	} else {
		zap.S().Debug("延时消息发送成功")
	}
	span5.Finish()

	tx.Commit()

	// 本地事务提交完成，订单创建成功，库存扣减完成，不执行库存归还。
	// 半消息提交 这里无论返回commit还是rollback，rmq都不会执行回查的方法，
	// 如果程序执行不到这里，例如出现无缘无故的失败情况，代码异常或者宕机，那么rmq的回查机制就会发挥作用
	return primitive.RollbackMessageState
}

// rmq回查
// 如果回查时出现了宕机，那么服务下次启动时，rmq依然会发来回查请求
func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
	zap.S().Debug("启动消息回查")
	order := new(model.OrderInfo)
	if res := DB.Where(model.OrderInfo{OrderSn: string(msg.Body)}).First(order); res.RowsAffected == 1 {
		// 创建订单成功，不归还库存
		zap.S().Debugf("消息回查结果：订单%s创建成功，不向rmq提交归还库存的消息", order.OrderSn)
		return primitive.RollbackMessageState
	}

	// 创建订单失败，提交消息。
	// 这里创建订单失败有两种情况：
	// 	1.库存还没有扣减成功，也没有开始创建订单。
	// 	2.库存已经扣减，但是创建订单失败。
	// 不论是哪种情况，这里都可以提交消息，但是在消费的一端就必须做好幂等性——判断库存是否扣减成功。
	// 如果扣减成功则消费消息，归还库存。
	// 如果库存没有扣减成功，则只需消费消息，不必归还库存。
	zap.S().Debugf("消息回查结果：订单%s创建失败，向rmq提交归还库存的消息", order.OrderSn)
	return primitive.CommitMessageState
}

// 创建订单
// 这是订单服务中最重要也是最复杂的接口
func (o *OrderServer) CreateOrder(c context.Context, r *proto.OrderRequest) (*proto.OrderInfoResponse, error) {
	orderSn := GenerateOrder()
	// 将请求信息放到orderListener中，方便ExecuteLocalTransaction方法进行处理
	orderListener := &OrderListener{
		OrderInfo: &model.OrderInfo{
			User:         r.UserId,
			OrderSn:      orderSn,
			Address:      r.Address,
			SignerName:   r.Name,
			SingerMobile: r.Mobile,
			Post:         r.Post,
		},
		// 不可以直接用等号赋值，orderListener.Ctx = c 是错误的
		Ctx: c,
	}

	// 由于每个请求中的orderListener都不一样，所以每次都要重新连到mq
	p, err := initialize.InitTransactionProducer(orderListener)
	if err != nil {
		zap.S().Error("初始化producer失败：", err)
		return nil, status.Errorf(codes.Internal, "初始化producer失败：%v", err)
	}

	// p.SendMessageInTransaction()方法中首先尝试发送半消息，err是发送半消息时返回的。
	// 随后执行ExecuteLocalTransaction方法，如果ExecuteLocalTransaction方法长期没有返回，
	// 则定期自动执行回查方法CheckLocalTransaction。
	_, err = p.SendMessageInTransaction(context.Background(), primitive.NewMessage(
		// topic不存在则会自动创建
		"order_reback",
		// 消息内容只传订单编号
		[]byte(orderSn)))
	if err != nil {
		zap.S().Error("向rocketMQ发送半消息失败：", err)
		return nil, status.Errorf(codes.Internal, "向rocketMQ发送半消息失败：%v", err)
	}

	if orderListener.ErrName != "" {
		zap.S().Error(orderListener.ErrName, orderListener.ErrInfo)
		return nil, status.Errorf(orderListener.Code, orderListener.ErrName)
	}

	return &proto.OrderInfoResponse{
		Id:      orderListener.OrderInfo.ID,
		OrderSn: orderSn,
		Total:   orderListener.OrderInfo.OrderMount,
	}, nil
}

// 获取用户的订单列表
func (o *OrderServer) OrderList(c context.Context, r *proto.OrderFilterRequest) (*proto.OrderListResponse, error) {
	topSpan := opentracing.SpanFromContext(c)

	getOrderListSpan := opentracing.StartSpan("gorm获取订单列表", opentracing.ChildOf(topSpan.Context()))
	var orders []*model.OrderInfo
	res := DB.Where(model.OrderInfo{User: r.UserId}).Scopes(Paginate(int(r.Pages), int(r.PagePerNums))).Find(&orders)
	if res.Error != nil {
		zap.S().Errorf("用户%d获取订单列表时出错：%v", r.UserId, res.Error.Error())
		return nil, status.Errorf(codes.Internal, "用户%d获取订单列表时出错", r.UserId)
	}
	if res.RowsAffected == 0 {
		zap.S().Errorf("用户%d的订单记录为空", r.UserId)
		return nil, status.Errorf(codes.Internal, "用户%d的订单记录为空", r.UserId)
	}
	getOrderListSpan.Finish()

	createRspSpan := opentracing.StartSpan("构造相应结果", opentracing.ChildOf(topSpan.Context()))
	orderListResponse := new(proto.OrderListResponse)
	orderListResponse.Total = int32(res.RowsAffected)
	for _, order := range orders {
		orderListResponse.Data = append(orderListResponse.Data, &proto.OrderInfoResponse{
			Id:      order.ID,
			UserId:  order.User,
			OrderSn: order.OrderSn,
			PayType: order.PayType,
			Status:  order.Status,
			Post:    order.Post,
			Total:   order.OrderMount,
			Address: order.Address,
			Name:    order.SignerName,
			Mobile:  order.SingerMobile,
			AddTime: order.CreatedAt.Format("2006-01-02-15-04-05"),
		})
	}
	createRspSpan.Finish()

	return orderListResponse, nil
}

// 获取订单详情
func (o *OrderServer) OrderDetail(c context.Context, r *proto.OrderRequest) (*proto.OrderInfoDetailResponse, error) {
	order := new(model.OrderInfo)
	if res := DB.Where("id", r.Id).First(order); res.Error != nil {
		zap.S().Errorf("获取订单%d的详情时出错：%v", r.Id, res.Error.Error())
		return nil, status.Errorf(codes.NotFound, "获取订单%d的详情时出错", r.Id)
	}

	var orderGoodsList []*model.OrderGoods
	if res := DB.Where("order", r.Id).Find(&orderGoodsList); res.Error != nil {
		zap.S().Errorf("获取订单%d的商品信息时出错：%v", r.Id, res.Error.Error())
		return nil, status.Errorf(codes.Internal, "获取订单%d的商品信息时出错", r.Id)
	}

	orderInfoDetailResponse := new(proto.OrderInfoDetailResponse)
	orderInfoDetailResponse.OrderInfo = &proto.OrderInfoResponse{
		Id:      order.ID,
		UserId:  order.User,
		OrderSn: order.OrderSn,
		PayType: order.PayType,
		Status:  order.Status,
		Post:    order.Post,
		Total:   order.OrderMount,
		Address: order.Address,
		Name:    order.SignerName,
		Mobile:  order.SingerMobile,
		AddTime: order.CreatedAt.Format("2006-01-02-15-04-05"),
	}
	for _, orderGoods := range orderGoodsList {
		orderInfoDetailResponse.Goods = append(orderInfoDetailResponse.Goods, &proto.OrderItemResponse{
			Id:         orderGoods.ID,
			OrderId:    orderGoods.Order,
			GoodsId:    orderGoods.Goods,
			GoodsName:  orderGoods.GoodsName,
			GoodsImage: orderGoods.GoodsImage,
			GoodsPrice: orderGoods.GoodsPrice,
			Nums:       orderGoods.Nums,
		})
	}

	return orderInfoDetailResponse, nil
}

// 更新订单状态
func (o *OrderServer) UpdateOrderStatus(c context.Context, r *proto.OrderStatus) (*emptypb.Empty, error) {
	//先查询，再更新 实际上有两条sql执行， select 和 update语句
	if res := DB.Model(&model.OrderInfo{}).Where("order_sn = ?", r.OrderSn).Update("status", r.Status); res.RowsAffected == 0 {
		zap.S().Errorf("订单%s不存在。详细原因：%v", r.OrderSn, res.Error.Error())
		return nil, status.Errorf(codes.NotFound, "订单%s不存在", r.OrderSn)
	}
	return &emptypb.Empty{}, nil
}

var num int64

// 生成27位订单号，前面20位代表时间精确到微秒，中间3位代表进程id，最后4位代表序号
func GenerateOrder() string {
	t := TimeFormatUs()
	p := os.Getpid() % 1000
	ps := sup(int64(p), 3)
	i := atomic.AddInt64(&num, 1)
	r := i % 10000
	rs := sup(r, 4)
	n := fmt.Sprintf("%s%s%s", t, ps, rs)
	if num > 9999999999 {
		num = 0
	}
	return n
}

// 对长度不足n的数字前面补0
func sup(i int64, n int) string {
	msStr := strconv.FormatInt(i, 10)
	for len(msStr) < n {
		msStr = "0" + msStr
	}
	return msStr
}

// 获取当前时间的“年月日时分秒毫秒微秒”格式
func TimeFormatUs() string {
	// 当前时间对象
	curTime := time.Now()
	// curTime的时间戳（秒）
	unixS := curTime.Unix()
	// curTime的时间戳（毫秒）
	unixMs := curTime.UnixNano() / 1e6
	// curTime的时间戳（微秒）
	unixUs := curTime.UnixNano() / 1e3
	// 毫秒时间
	timeMs := unixMs - unixS*1e3
	// 如果毫秒数不够三位的话，则在前面补0
	msStr := sup(timeMs, 3)
	// 微妙时间
	timeUs := unixUs - unixMs*1e3
	// 如果微秒数不够三位的话，则在前面补0
	usStr := sup(timeUs, 3)
	// curTime的日期格式
	dateStr := curTime.Format("20060102150405")

	return dateStr + msStr + usStr
}

// 优雅实现分页
func Paginate(page, pageSize int) func(db *gorm.DB) *gorm.DB {
	return func(db *gorm.DB) *gorm.DB {
		if page == 0 {
			page = 1
		}

		switch {
		case pageSize > 100:
			pageSize = 100
		case pageSize <= 0:
			pageSize = 10
		}

		offset := (page - 1) * pageSize
		return db.Offset(offset).Limit(pageSize)
	}
}

// 处理订单超时
func HandelOrderTimeout(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
	for _, msg := range msgs {
		// 获取rmq中的消息——订单编号
		orderSn := string(msg.Body)
		zap.S().Debug("消费端读到消息：", orderSn)
		// 查询订单详情表
		orderInfo := new(model.OrderInfo)

		res := DB.Where(model.OrderInfo{OrderSn: orderSn}).First(orderInfo)
		if res.RowsAffected == 1 && orderInfo.Status != "TRADE_SUCCESS" {
			// 若在订单详情表中可以找到该订单，并且订单的支付状态不是已支付"，则应归还库存

			// 修改订单状态为"超时关闭"
			tx := DB.Begin()
			orderInfo.Status = "TRADE_CLOSED"
			result := tx.Where(model.OrderInfo{OrderSn: orderSn}).Select("status").Updates(orderInfo)
			if result.Error != nil {
				tx.Rollback()
				return consumer.ConsumeRetryLater, result.Error
			}

			// 归还方法是向topic：order_reback中发送消息
			p, err := initialize.InitSimpleProducer()
			if err != nil {
				tx.Rollback()
				return consumer.ConsumeRetryLater, err
			}

			_, err = p.SendSync(context.Background(), primitive.NewMessage(
				// topic不存在则会自动创建
				"order_reback",
				[]byte(orderSn)))
			if err != nil {
				tx.Rollback()
				return consumer.ConsumeRetryLater, err
			}

			tx.Commit()
			zap.S().Debugf("订单%s超时，库存归还成功", orderSn)
		}
		// 找不到订单或者订单已支付，直接消费该消息，不执行库存归还
		return consumer.ConsumeSuccess, nil
	}
	return consumer.ConsumeSuccess, nil
}
